package com.doit.day02;

import com.doit.day01.MyConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

/**
 * 1.每5s输出一次当前来了多少用户(去重)  uv
 * 2.将每条数据添加一个字段来标识，如果这个用户的id是第一次出现，那么就标注1，否则就是0
 */
public class _03_练习之消费者组在均衡策略观察3 {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyConfig.HOST_AND_PORT);
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g02");
        //选配的
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        //按照哪一种消费策略操作
        props.setProperty("partition.assignment.strategy","org.apache.kafka.clients.consumer.CooperativeStickyAssignor");


        //先将数据消费出来
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
            /**
             * 在均衡分配之前，该消费者消费的分区有哪些
             */
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("分配前的分区是："+partitions);

            }

            /**
             * 在均衡分配之后，该消费者消费的分区是哪些
             */
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("分配后的分区是："+partitions);
            }
        });

        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
        }
    }
}
